blob: 1a8b3e605bb581dcef0fd09ed5548fc6b243dc0f [file] [log] [blame]
/*
* This file is part of the QuickServer library
* Copyright (C) QuickServer.org
*
* Use, modification, copying and distribution of this software is subject to
* the terms and conditions of the GNU Lesser General Public License.
* You should have received a copy of the GNU LGP License along with this
* library; if not, you can download a copy from <http://www.quickserver.org/>.
*
* For questions, suggestions, bug-reports, enhancement-requests etc.
* visit http://www.quickserver.org
*
*/
package org.quickserver.net.server.impl;
import org.quickserver.net.server.*;
import org.quickserver.net.*;
import org.quickserver.util.*;
import org.quickserver.util.io.*;
import java.io.*;
import java.net.*;
import java.util.*;
import java.util.logging.*;
import java.nio.*;
import java.nio.channels.*;
import javax.net.ssl.*;
public class NonBlockingClientHandler extends BasicClientHandler {
private static final Logger logger = Logger.getLogger(NonBlockingClientHandler.class.getName());
protected ClientWriteHandler clientWriteHandler; //v1.4.5
private SocketChannel socketChannel;
protected ArrayList readByteBuffer = new ArrayList();
protected ArrayList writeByteBuffer = new ArrayList();
protected SelectionKey selectionKey;
protected volatile int threadAccessCount = 0;
protected volatile boolean willReturn;
protected volatile boolean waitingForFinalWrite;
private static int maxThreadAccessCount = 5; //one for each event ACCEPT, WRITE, READ
private static boolean wakeupSelectorAfterRegisterWrite = true;
private static boolean wakeupSelectorAfterRegisterRead = true;
//nio ssl
//private final SSLSession session;
private boolean initialHandshakeStatus = false;
private SSLEngineResult.HandshakeStatus handshakeStatus;
private SSLEngineResult.Status status = null;
private ByteBuffer dummyByteBuffer = ByteBuffer.allocate(0);
private ByteBuffer peerNetData = null;
private boolean sslShutdown = false;
/**
* Sets the flag to wakeup Selector After RegisterForWrite is called.
* @since 1.4.7
*/
public static void setWakeupSelectorAfterRegisterWrite(boolean flag) {
wakeupSelectorAfterRegisterWrite = flag;
}
/**
* Returns wakeupSelectorAfterRegisterWrite the flag that controls if wakeup is called on Selector
* after RegisterForWrite is called.
* @since 1.4.7
*/
public static boolean getWakeupSelectorAfterRegisterWrite() {
return wakeupSelectorAfterRegisterWrite;
}
/**
* Sets the flag to wakeup Selector After RegisterForRead is called.
* @since 1.4.7
*/
public static void setWakeupSelectorAfterRegisterRead(boolean flag) {
wakeupSelectorAfterRegisterRead = flag;
}
/**
* Returns wakeupSelectorAfterRegisterRead the flag that controls if wakeup is called on Selector
* after RegisterForRead is called.
* @since 1.4.7
*/
public static boolean getWakeupSelectorAfterRegisterRead() {
return wakeupSelectorAfterRegisterRead;
}
/**
* Sets the maximum count of thread allowed to run objects of this class at a time.
* @since 1.4.7
*/
public static void setMaxThreadAccessCount(int count) {
if(count<3 && count!=-1) throw new IllegalArgumentException("Value should be >=3 or -1");
maxThreadAccessCount = count;
}
/**
* Returns the maximum count of thread allowed to run objects of this class at a time.
* @since 1.4.7
*/
public static int getMaxThreadAccessCount() {
return maxThreadAccessCount;
}
//v1.4.7
private ByteBufferOutputStream byteBufferOutputStream;
public NonBlockingClientHandler(int instanceCount) {
super(instanceCount);
}
public NonBlockingClientHandler() {
super();
}
public void clean() {
logger.log(Level.FINEST, "Starting clean - {0}", getName());
if(threadAccessCount!=0) {
logger.log(Level.WARNING, "Thread Access Count was not 0!: {0}", threadAccessCount);
if(Assertion.isEnabled()) {
assertionSystemExit();
}
threadAccessCount = 0;
}
while(readByteBuffer.isEmpty()==false) {
try {
getServer().getByteBufferPool().returnObject(
readByteBuffer.remove(0));
} catch(Exception er) {
logger.log(Level.WARNING, "Error in returning read ByteBuffer to pool: "+er, er);
break;
}
}
while(writeByteBuffer.isEmpty()==false) {
try {
getServer().getByteBufferPool().returnObject(
writeByteBuffer.remove(0));
} catch(Exception er) {
appLogger.log(Level.WARNING, "Error in returning write ByteBuffer to pool: "+er, er);
break;
}
}
if(peerNetData!=null) {
try {
getServer().getByteBufferPool().returnObject(peerNetData);
} catch(Exception er) {
appLogger.log(Level.WARNING, "Error in returning peerNetData to pool: "+er, er);
}
}
if(selectionKey!=null) {
selectionKey.cancel();
selectionKey.selector().wakeup();
selectionKey = null;
}
willReturn = false;
waitingForFinalWrite = false;
socketChannel = null;
if(byteBufferOutputStream!=null) {
byteBufferOutputStream.close();
}
super.clean();
clientWriteHandler = null;//1.4.5
byteBufferOutputStream = null;
sslShutdown = false;
logger.log(Level.FINEST, "Finished clean - {0}", getName());
}
public void handleClient(TheClient theClient) throws Exception {
super.handleClient(theClient);
setClientWriteHandler(theClient.getClientWriteHandler()); //v1.4.5
setSocketChannel(theClient.getSocketChannel());//1.4.5
}
protected void setInputStream(InputStream in) throws IOException {
this.in = in;
if(getDataMode(DataType.IN) == DataMode.STRING) {
b_in = null;
o_in = null;
bufferedReader = null;
} else if(getDataMode(DataType.IN) == DataMode.OBJECT) {
b_in = null;
bufferedReader = null;
o_in = new ObjectInputStream(in);
} else if(getDataMode(DataType.IN) == DataMode.BYTE ||
getDataMode(DataType.IN) == DataMode.BINARY) {
o_in = null;
bufferedReader = null;
b_in = null;
}
}
public BufferedReader getBufferedReader() {
throw new IllegalStateException("Access to BufferedReader in not allowed in Non-Blocking mode!");
}
public void closeConnection() {
logger.finest("inside");
synchronized(this) {
if(connection==false) return;
if(waitingForFinalWrite) return;
if(getSelectionKey()!=null && getSelectionKey().isValid() && lost == false) {
waitingForFinalWrite = true;
} else {
connection = false;
}
}
try {
if(getSocketChannel()!=null && socket!=null) {
if(waitingForFinalWrite) {
try {
waitTillFullyWritten();
} catch(Exception error) {
logger.warning("Error in waitingForFinalWrite : "+error);
if(logger.isLoggable(Level.FINE)) {
logger.fine("StackTrace:\n"+MyString.getStackTrace(error));
}
}
}//end of waitingForFinalWrite
if(isSecure()==true) {
sslShutdown = true;
if(lost == false && sslEngine.isOutboundDone()==false) {
logger.finest("SSL isOutboundDone is false");
if(byteBufferOutputStream.doShutdown()==false) {
return;
}
} else if(sslEngine.isOutboundDone()) {
logger.finest("SSL Outbound is done.");
}
}
doPostCloseActivity();
}//if socket
} catch(IOException e) {
logger.warning("Error in closeConnection : "+e);
if(logger.isLoggable(Level.FINE)) {
logger.fine("StackTrace:\n"+MyString.getStackTrace(e));
}
} catch(NullPointerException npe) {
logger.fine("NullPointerException: "+npe);
if(logger.isLoggable(Level.FINE)) {
logger.fine("StackTrace:\n"+MyString.getStackTrace(npe));
}
}
}
private void doPostCloseActivity() throws IOException {
connection = false;
byteBufferOutputStream.forceNotify();
getSelectionKey().cancel();
if(getServer()!=null) {
getServer().getSelector().wakeup();
}
synchronized(this) {
if(hasEvent(ClientEvent.MAX_CON)==false) {
notifyCloseOrLost();
}
if(getSocketChannel().isOpen()) {
logger.finest("Closing SocketChannel");
getSocketChannel().close();
}
}
}
public boolean closeIfSSLOutboundDone() {
if(isSecure()==false) throw new IllegalStateException("Client is not in secure mode!");
if(sslEngine.isOutboundDone()) {
logger.finest("SSL Outbound is done.");
try {
if(getSocketChannel().isOpen()) {
logger.finest("Closing SocketChannel");
getSocketChannel().close();
}
} catch(IOException e) {
logger.fine("IGNORE: Error in Closing SocketChannel: "+e);
}
return true;
} else {
logger.finest("SSL Outbound is not done.");
return false;
}
}
/**
* waitTillFullyWritten
* @since 1.4.7
*/
public void waitTillFullyWritten() {
Object waitLock = new Object();
if(byteBufferOutputStream.isDataAvailableForWrite(waitLock)) {
if(ByteBufferOutputStream.isLoggable(Level.FINEST)) {
logger.finest("Waiting "+getName());
}
try {
synchronized(waitLock) {
waitLock.wait(1000*60*2);//2 min max
}
} catch(InterruptedException ie) {
logger.warning("Error: "+ie);
}
if(ByteBufferOutputStream.isLoggable(Level.FINEST)) {
logger.finest("Done. "+getName());
}
}
}
public void run() {
if(unprocessedClientEvents.isEmpty()) {
logger.finest("No unprocessed ClientEvents!");
return;
}
synchronized(this) {
if(willReturn) {
return;
} else {
threadAccessCount++;
}
}
ClientEvent currentEvent = (ClientEvent) unprocessedClientEvents.poll();
if(currentEvent==null) {
threadEvent.set(null);
logger.finest("No unprocessed ClientEvents! pool was null");
return;
}
if(logger.isLoggable(Level.FINEST)) {
StringBuilder sb = new StringBuilder();
sb.append("Running ").append(getName());
sb.append(" using ");
sb.append(Thread.currentThread().getName());
sb.append(" for ");
synchronized(clientEvents) {
if(clientEvents.size()>1) {
sb.append(currentEvent+", Current Events - "+clientEvents);
} else {
sb.append(currentEvent);
}
}
logger.finest(sb.toString());
}
logger.finest("threadAccessCount: "+threadAccessCount);
threadEvent.set(currentEvent);
try {
if(maxThreadAccessCount!=-1 && threadAccessCount>maxThreadAccessCount) {
logger.warning("ThreadAccessCount can't go beyond "+maxThreadAccessCount+": "+threadAccessCount);
if(Assertion.isEnabled()) {
throw new AssertionError("ThreadAccessCount can't go beyond "+maxThreadAccessCount+": "+threadAccessCount);
}
return;
}
if(socket==null)
throw new SocketException("Socket was null!");
if(getThreadEvent()==ClientEvent.ACCEPT ||
getThreadEvent()==ClientEvent.MAX_CON) {
prepareForRun();
Assertion.affirm(willReturn==false, "WillReturn has to be false!: "+willReturn);
}
if(getThreadEvent()==ClientEvent.MAX_CON) {
processMaxConnection(currentEvent);
}
try {
if(getThreadEvent()==ClientEvent.ACCEPT) {
registerForRead();
clientEventHandler.gotConnected(this);
if(authorised == false) {
if(clientAuthenticationHandler==null && authenticator == null) {
authorised = true;
logger.finest("No Authenticator "+getName()+" so return thread.");
} else {
if(clientAuthenticationHandler!=null) {
AuthStatus authStatus = null;
do {
authStatus = processAuthorisation();
} while(authStatus==AuthStatus.FAILURE);
if(authStatus==AuthStatus.SUCCESS)
authorised = true;
} else {
processAuthorisation();
}
if(authorised)
logger.finest("Authentication done "+getName()+", so return thread.");
else
logger.finest("askAuthentication() done "+getName()+", so return thread.");
}
}//end authorised
returnThread(); //return thread to pool
return;
}
if(connection && getThreadEvent()==ClientEvent.READ) {
if(processRead()) return;
}
if(connection && getThreadEvent()==ClientEvent.WRITE) {
if(processWrite()) return;
}
} catch(SocketException e) {
appLogger.finest("SocketException - Client [" +
getHostAddress() +"]: " + e.getMessage());
//e.printStackTrace();
lost = true;
} catch(AppException e) {
//errors from Application
appLogger.finest("AppException "+Thread.currentThread().getName()+": "
+ e.getMessage());
} catch(javax.net.ssl.SSLException e) {
lost = true;
if(Assertion.isEnabled()) {
appLogger.info("SSLException - Client ["+getHostAddress()
+"] "+Thread.currentThread().getName()+": " + e);
} else {
appLogger.warning("SSLException - Client ["+
getHostAddress()+"]: "+e);
}
} catch(ConnectionLostException e) {
lost = true;
if(e.getMessage()!=null)
appLogger.finest("Connection lost " +
Thread.currentThread().getName()+": " + e.getMessage());
else
appLogger.finest("Connection lost "+Thread.currentThread().getName());
} catch(ClosedChannelException e) {
lost = true;
appLogger.finest("Channel closed "+Thread.currentThread().getName()+": " + e);
} catch(IOException e) {
lost = true;
appLogger.fine("IOError "+Thread.currentThread().getName()+": " + e);
} catch(AssertionError er) {
logger.warning("[AssertionError] "+getName()+" "+er);
if(logger.isLoggable(Level.FINEST)) {
logger.finest("StackTrace "+Thread.currentThread().getName()+": "+MyString.getStackTrace(er));
}
assertionSystemExit();
} catch(RuntimeException re) {
logger.warning("[RuntimeException] "+MyString.getStackTrace(re));
if(Assertion.isEnabled()) {
assertionSystemExit();
}
lost = true;
} catch(Throwable er) {
logger.warning("[Error] "+er);
if(logger.isLoggable(Level.FINEST)) {
logger.finest("StackTrace "+Thread.currentThread().getName()+": "+MyString.getStackTrace(er));
}
if(Assertion.isEnabled()) {
assertionSystemExit();
}
lost = true;
}
if(getThreadEvent()!=ClientEvent.MAX_CON) {
notifyCloseOrLost();
}
if(connection) {
logger.finest(Thread.currentThread().getName()+" calling closeConnection()");
closeConnection();
}
if(connection==true && lost==true && waitingForFinalWrite) {
byteBufferOutputStream.forceNotify();
}
} catch(javax.net.ssl.SSLException se) {
logger.warning("SSLException "+Thread.currentThread().getName()+" - " + se);
} catch(IOException ie) {
logger.warning("IOError "+Thread.currentThread().getName()+" - Closing Client : " + ie);
} catch(RuntimeException re) {
logger.warning("[RuntimeException] "+getName()+" "+Thread.currentThread().getName()+" - "+MyString.getStackTrace(re));
if(Assertion.isEnabled()) {
assertionSystemExit();
}
} catch(Exception e) {
logger.warning("Error "+Thread.currentThread().getName()+" - Event:"+getThreadEvent()+" - Socket:"+socket+" : "+e);
logger.fine("StackTrace: "+getName()+"\n"+MyString.getStackTrace(e));
if(Assertion.isEnabled()) {
assertionSystemExit();
}
} catch(Throwable e) {
logger.warning("Error "+Thread.currentThread().getName()+" - Event:"+getThreadEvent()+" - Socket:"+socket+" : "+e);
logger.fine("StackTrace: "+getName()+"\n"+MyString.getStackTrace(e));
if(Assertion.isEnabled()) {
assertionSystemExit();
}
}
synchronized(this) {
try {
if(getSelectionKey()!=null && getSelectionKey().isValid()) {
logger.finest("Canceling SelectionKey");
getSelectionKey().cancel();
}
if(socket!=null && socket.isClosed()==false) {
logger.finest("Closing Socket");
socket.close();
}
if(getSocketChannel()!=null && getSocketChannel().isOpen()) {
logger.finest("Closing SocketChannel");
socketChannel.close();
}
} catch(Exception re) {
logger.warning("Error closing Socket/Channel: " +re);
}
}//end synchronized
willClean = true;
returnClientData();
boolean returnClientHandler = false;
synchronized(lockObj) {
returnThread();
returnClientHandler = checkReturnClientHandler();
}
if(returnClientHandler) {
returnClientHandler(); //return to pool
}
}
protected boolean checkReturnClientHandler() {
if(willReturn==false) {
willReturn = true;
return true;
}
return false;
}
/**
* Process read
* @return value indicates if the thread should return form run()
*/
private boolean processRead() throws Exception {
if(doRead()) {
returnThread(); //return to pool
return true;
} else {
return false;
}
}
private boolean doRead() throws Exception {
int count = 0;
int fullCount = 0;
while(true) {
try {
if(peerNetData==null) {
peerNetData = (ByteBuffer) getServer().getByteBufferPool().borrowObject();
}
count = getSocketChannel().read(peerNetData);
if(count<0) {
//logger.finest("SocketChannel read was "+count+"!");
getServer().getByteBufferPool().returnObject(peerNetData);
peerNetData = null;
break;
} else {
fullCount += count;
}
peerNetData.flip(); // Make readable
ByteBuffer peerAppData = null;
//--
if(sslEngine!=null) {
SSLEngineResult res;
peerAppData = (ByteBuffer)
getServer().getByteBufferPool().borrowObject();
do {
res = sslEngine.unwrap(peerNetData, peerAppData);
logger.info("Unwrapping:\n" + res);
} while(res.getStatus() == SSLEngineResult.Status.OK &&
res.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NEED_UNWRAP &&
res.bytesProduced() == 0);
if(res.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.FINISHED) {
logger.info("HandshakeStatus.FINISHED!");
finishInitialHandshake();
}
if(peerAppData.position() == 0 &&
res.getStatus() == SSLEngineResult.Status.OK &&
peerNetData.hasRemaining()) {
logger.info("peerNetData hasRemaining and pos=0!");
res = sslEngine.unwrap(peerNetData, peerAppData);
logger.info("Unwrapping:\n" + res);
}
/*
* OK, OVERFLOW, UNDERFLOW, CLOSED
*/
status = res.getStatus();
handshakeStatus = res.getHandshakeStatus();
if(status != SSLEngineResult.Status.BUFFER_OVERFLOW) {
logger.warning("Buffer overflow: " + res.toString());
} else if(status == SSLEngineResult.Status.CLOSED) {
logger.fine("Connection is being closed by peer.");
lost = true;
System.out.println("NEdd to code for shutdow of SSL");
break;
}
peerNetData.compact();
peerAppData.flip();
if(handshakeStatus == SSLEngineResult.HandshakeStatus.NEED_TASK ||
handshakeStatus == SSLEngineResult.HandshakeStatus.NEED_WRAP ||
handshakeStatus == SSLEngineResult.HandshakeStatus.FINISHED) {
doHandshake();
}
//return peerAppData.remaining();
logger.fine("peerAppData.remaining(): "+peerAppData.remaining());
} else {
peerAppData = peerNetData;
peerNetData = null;
}
//--
readByteBuffer.add(peerAppData);
peerAppData = null;
} catch(Exception error) {
logger.finest("Error in data read: "+error);
if(sslEngine!=null) sslEngine.closeInbound();
lost = true;
synchronized(getInputStream()) {
getInputStream().notifyAll();
}
throw error;
}
if(count==0) break;
}//end while
if(count<0) {
logger.finest("SocketChannel read was "+count+"!");
if(sslEngine!=null) sslEngine.closeInbound();
lost = true;
synchronized(getInputStream()) {
getInputStream().notifyAll();
}
} else {
logger.finest(fullCount+" bytes read");
if(fullCount!=0) {
updateLastCommunicationTime();
synchronized(getInputStream()) {
getInputStream().notify(); //if any are waiting
}
if(hasEvent(ClientEvent.ACCEPT) == false) {
processGotDataInBuffers();
}
}
//check if any data was read but not yet processed
while(getInputStream().available()>0) {
logger.finest("Sending again for processing...");
if(hasEvent(ClientEvent.ACCEPT) == false) {
processGotDataInBuffers();
break;
} else {
synchronized(getInputStream()) {
getInputStream().notifyAll();
}
Thread.sleep(100);
}
}
if(connection) {
registerForRead();
//getSelectionKey().selector().wakeup();
return true;
}
}//end of else
logger.finest("We don't have connection, lets return all resources.");
return false;
}
/**
* Process write
* @return value indicates if the thread should return form run()
*/
private boolean processWrite() throws IOException {
if(doWrite()) {
returnThread(); //return to pool
return true;
} else {
return false;
}
}
private boolean doWrite() throws IOException {
if(sslShutdown) {
if(byteBufferOutputStream.doShutdown()==false) {
return true;
}
doPostCloseActivity();
logger.finest("We don't have connection, lets return all resources.");
return false;
}
updateLastCommunicationTime();
boolean flag = byteBufferOutputStream.writeAllByteBuffer();
if(flag==false) {
registerWrite();
} else if(/*flag==true && */clientWriteHandler!=null) {
clientWriteHandler.handleWrite(this);
}
if(connection) {
return true;
} else {
logger.finest("We don't have connection, lets return all resources.");
return false;
}
}
protected void returnThread() {
//System.out.println("returnThread..");
//(new Exception()).printStackTrace();
threadAccessCount--;
Assertion.affirm(threadAccessCount>=0, "ThreadAccessCount went less the 0! Value: "+threadAccessCount);
//return is done at ClientThread end
removeEvent((ClientEvent)threadEvent.get());
}
protected void returnClientHandler() {
logger.finest(getName());
try {
for(int i=0;threadAccessCount!=0;i++) {
if(i==100) {
logger.warning("ClientHandler must have got into a loop waiting for thread to free up! ThreadAccessCount="+threadAccessCount);
threadAccessCount = 0;
if(Assertion.isEnabled()) {
assertionSystemExit();
} else {
break;
}
}
if(threadAccessCount<=0) break;
logger.finest("Waiting for other thread of "+getName()+" to finish");
Thread.sleep(60);
}
} catch(InterruptedException ie) {
appLogger.warning("InterruptedException: "+ie);
}
super.returnClientHandler();
}
public void setDataMode(DataMode dataMode, DataType dataType)
throws IOException {
if(getDataMode(dataType)==dataMode) return;
appLogger.fine("Setting Type:"+dataType+", Mode:"+dataMode);
super.checkDataModeSet(dataMode, dataType);
setDataModeNonBlocking(dataMode, dataType);
}
private void setDataModeNonBlocking(DataMode dataMode, DataType dataType)
throws IOException {
logger.finest("ENTER");
if(dataMode == DataMode.STRING) {
if(dataType == DataType.OUT) {
if(dataModeOUT == DataMode.BYTE || dataModeOUT == DataMode.BINARY) {
dataModeOUT = dataMode;
} else if(dataModeOUT == DataMode.OBJECT) {
dataModeOUT = dataMode;
o_out.flush(); o_out = null;
b_out = new BufferedOutputStream(out);
} else {
Assertion.affirm(false, "Unknown DataType.OUT DataMode - "+dataModeOUT);
}
Assertion.affirm(b_out!=null, "BufferedOutputStream is still null!");
Assertion.affirm(o_out==null, "ObjectOutputStream is still not null!");
} else if(dataType == DataType.IN) {
dataModeIN = dataMode;
if(o_in!=null) {
if(o_in.available()!=0)
logger.warning("Data looks to be present in ObjectInputStream");
o_in = null;
}
b_in = null;
bufferedReader = null;
//input stream will work
Assertion.affirm(in!=null, "InputStream is still null!");
Assertion.affirm(b_in==null, "BufferedInputStream is still not null!");
Assertion.affirm(bufferedReader==null, "BufferedReader is still not null!");
}
} else if(dataMode == DataMode.OBJECT) {
if(dataType == DataType.IN) {
//we will disable this for now
throw new IllegalArgumentException("Can't set DataType.IN mode to OBJECT when blocking mode is set as false!");
}
if(dataType == DataType.OUT) {
dataModeOUT = dataMode;
b_out = null;
o_out = new ObjectOutputStream(out);
Assertion.affirm(o_out!=null, "ObjectOutputStream is still null!");
o_out.flush();
} else if(dataType == DataType.IN) {
dataModeIN = dataMode;
b_in = null;
bufferedReader = null;
registerForRead();
o_in = new ObjectInputStream(in); //will block
Assertion.affirm(o_in!=null, "ObjectInputStream is still null!");
}
} else if(dataMode == DataMode.BYTE || dataMode == DataMode.BINARY) {
if(dataType == DataType.OUT) {
if(dataModeOUT == DataMode.STRING ||
dataModeOUT == DataMode.BYTE ||
dataModeOUT == DataMode.BINARY) {
dataModeOUT = dataMode;
} else if(dataModeOUT == DataMode.OBJECT) {
dataModeOUT = dataMode;
o_out = null;
b_out = new BufferedOutputStream(out);
} else {
Assertion.affirm(false, "Unknown DataType.OUT - DataMode: "+dataModeOUT);
}
Assertion.affirm(b_out!=null, "BufferedOutputStream is still null!");
} else if(dataType == DataType.IN) {
dataModeIN = dataMode;
o_in = null;
bufferedReader = null;
b_in = null;
//input stream will work
Assertion.affirm(in!=null, "InputStream is still null!");
} else {
throw new IllegalArgumentException("Unknown DataType : "+dataType);
}
} else {
throw new IllegalArgumentException("Unknown DataMode : "+dataMode);
}
}
protected byte[] readInputStream() throws IOException {
return readInputStream(getInputStream());
}
public void updateInputOutputStreams() throws IOException {
byteBufferOutputStream = new ByteBufferOutputStream(writeByteBuffer, this);
setInputStream( new ByteBufferInputStream(readByteBuffer, this, getCharset()) );
setOutputStream(byteBufferOutputStream);
//logger.warning("updateInputOutputStreams: "+sslEngine);
if(sslEngine!=null) {
sslEngine.setUseClientMode(false);
sslEngine.beginHandshake();
handshakeStatus = sslEngine.getHandshakeStatus();
initialHandshakeStatus = true;
/*
try {
doHandshake();
} catch(Exception e) {
logger.warning("Error: "+e);
throw new IOException(e.toString());
}
*/
}
}
public boolean getBlockingMode() {
return false;
}
public void setSocketChannel(SocketChannel socketChannel) {
this.socketChannel = socketChannel;
}
public SocketChannel getSocketChannel() {
return socketChannel;
}
public void setSelectionKey(SelectionKey selectionKey) {
this.selectionKey = selectionKey;
}
public SelectionKey getSelectionKey() {
if(selectionKey==null)
selectionKey = getSocketChannel().keyFor(getServer().getSelector());
return selectionKey;
}
private void processGotDataInBuffers() throws AppException,
ConnectionLostException, ClassNotFoundException, IOException {
if(getInputStream().available()==0) return;
logger.finest("Trying to process got data.. DataMode.IN="+dataModeIN);
AuthStatus authStatus = null;
//--For debug
((ByteBufferInputStream) getInputStream()).dumpContent();
String temp = null;
String rec = null;
Object recObject = null;
byte[] recByte = null;
boolean timeToCheckForNewLineMiss = false;
do {
//updateLastCommunicationTime();
if(dataModeIN == DataMode.STRING) {
ByteBufferInputStream bbin = (ByteBufferInputStream)
getInputStream();
timeToCheckForNewLineMiss = true;
while(bbin.isLineReady()) {
rec = bbin.readLine();
if(rec==null) {
lost = true;
return;
}
if(getCommunicationLogging() && authorised == true) {
appLogger.log(Level.FINE, "Got STRING [{0}] : {1}", new Object[]{getHostAddress(), rec});
}
totalReadBytes = totalReadBytes + rec.length();
if(authorised == false)
authStatus = clientAuthenticationHandler.handleAuthentication(this, rec);
else
clientCommandHandler.handleCommand(this, rec);
if(isClosed()==true) return;
while(authStatus==AuthStatus.FAILURE)
authStatus = processAuthorisation();
if(authStatus==AuthStatus.SUCCESS)
authorised = true;
if(dataModeIN != DataMode.STRING) {
break;
}
timeToCheckForNewLineMiss = false;
}//end of while
if(timeToCheckForNewLineMiss && bbin.availableOnlyInByteBuffer()==0) {
return;
} else {
timeToCheckForNewLineMiss = false;
}
}
//if(dataModeIN == DataMode.OBJECT) {
while(dataModeIN == DataMode.OBJECT && o_in!=null) {
//not sure if all bytes are in buffer..~ may need more read.. will get stuck..
recObject = o_in.readObject();
if(recObject==null) {
lost = true;
return;
}
if(getCommunicationLogging() && authorised == true) {
appLogger.log(Level.FINE, "Got OBJECT [{0}] : {1}", new Object[]{getHostAddress(), recObject.toString()});
}
totalReadBytes = totalReadBytes + 1;
if(authorised == false)
authStatus = clientAuthenticationHandler.handleAuthentication(this, recObject);
else
clientObjectHandler.handleObject(this, recObject);
if(isClosed()==true) return;
while(authStatus==AuthStatus.FAILURE)
authStatus = processAuthorisation();
if(authStatus==AuthStatus.SUCCESS)
authorised = true;
}
//}
//if(dataModeIN == DataMode.BYTE) {
while(dataModeIN == DataMode.BYTE && getInputStream().available()!=0) {
rec = readBytes();
if(rec==null) {
lost = true;
return;
}
if(getCommunicationLogging() && authorised == true) {
appLogger.log(Level.FINE, "Got BYTE [{0}] : {1}",
new Object[]{getHostAddress(), rec});
}
totalReadBytes = totalReadBytes + rec.length();
if(authorised == false)
authStatus = clientAuthenticationHandler.handleAuthentication(this, rec);
else
clientCommandHandler.handleCommand(this, rec);
if(isClosed()==true) return;
while(authStatus==AuthStatus.FAILURE)
authStatus = processAuthorisation();
if(authStatus==AuthStatus.SUCCESS)
authorised = true;
}
//} else if(dataModeIN == DataMode.BINARY) {
while(dataModeIN == DataMode.BINARY && getInputStream().available()!=0) {
recByte = readBinary();
if(recByte==null) {
lost = true;
return;
}
if(getCommunicationLogging() && authorised == true) {
if(getServer().isRawCommunicationLogging()) {
if(getServer().getRawCommunicationMaxLength()>0 &&
recByte.length>getServer().getRawCommunicationMaxLength()) {
appLogger.log(Level.FINE,
"Got BINARY [{0}] : {1}; RAW: {2}{3}", new Object[]{
getHostAddress(), MyString.getMemInfo(recByte.length),
new String(recByte,0,getServer().getRawCommunicationMaxLength(),charset),"..."});
} else {
appLogger.log(Level.FINE,
"Got BINARY [{0}] : {1}; RAW: {2}", new Object[]{
getHostAddress(), MyString.getMemInfo(recByte.length),
new String(recByte,charset)});
}
} else {
appLogger.log(Level.FINE,
"Got BINARY [{0}] : {1}", new Object[]{getHostAddress(),
MyString.getMemInfo(recByte.length)});
}
} else if (getCommunicationLogging()) {
appLogger.log(Level.FINE,
"Got BINARY [{0}] : {1}", new Object[]{getHostAddress(),
MyString.getMemInfo(recByte.length)});
}
totalReadBytes = totalReadBytes + recByte.length;
if(authorised == false)
authStatus = clientAuthenticationHandler.handleAuthentication(this, recByte);
else
clientBinaryHandler.handleBinary(this, recByte);
if(isClosed()==true) return;
while(authStatus==AuthStatus.FAILURE)
authStatus = processAuthorisation();
if(authStatus==AuthStatus.SUCCESS)
authorised = true;
}
//} else {
if(dataModeIN != DataMode.STRING && dataModeIN != DataMode.OBJECT
&& dataModeIN != DataMode.BYTE && dataModeIN != DataMode.BINARY) {
throw new IllegalStateException("Incoming DataMode is not supported : "+dataModeIN);
}
} while(getInputStream().available()!=0);
}
public void registerForRead()
throws IOException, ClosedChannelException {
//System.out.println("registerForRead..");
//(new Exception()).printStackTrace();
try {
if(getSelectionKey()==null) {
boolean flag = getServer().registerChannel(getSocketChannel(),
SelectionKey.OP_READ, this);
if(flag) {
logger.finest("Adding OP_READ as interest Ops for "+getName());
} else if(ByteBufferOutputStream.isLoggable(Level.FINEST)) {
logger.finest("OP_READ is already present in interest Ops for "+getName());
}
} else if(getSelectionKey().isValid()) {
if((getSelectionKey().interestOps() & SelectionKey.OP_READ) == 0 ) {
logger.finest("Adding OP_READ to interest Ops for "+getName());
removeEvent(ClientEvent.READ);
getSelectionKey().interestOps(getSelectionKey().interestOps()
| SelectionKey.OP_READ);
if(wakeupSelectorAfterRegisterRead) {
getServer().getSelector().wakeup();
}
} else {
if(ByteBufferOutputStream.isLoggable(Level.FINEST)) {
logger.finest("OP_READ is already present in interest Ops for "+getName());
}
}
} else {
throw new IOException("SelectionKey is invalid!");
}
} catch(CancelledKeyException e) {
throw new IOException("SelectionKey is cancelled!");
}
}
public void registerForWrite()
throws IOException, ClosedChannelException {
if(hasEvent(ClientEvent.RUN_BLOCKING) || hasEvent(ClientEvent.MAX_CON_BLOCKING)) {
throw new IllegalStateException("This method is only allowed under Non-Blocking mode.");
}
if(clientWriteHandler==null) {
throw new IllegalStateException("ClientWriteHandler has not been set!");
}
registerWrite();
}
public void registerWrite() throws IOException {
//System.out.println("registerWrite..");
//(new Exception()).printStackTrace();
try {
if(getSelectionKey()==null) {
boolean flag = getServer().registerChannel(getSocketChannel(),
SelectionKey.OP_WRITE, this);
if(flag) {
logger.finest("Adding OP_WRITE as interest Ops for "+getName());
} else if(ByteBufferOutputStream.isLoggable(Level.FINEST)) {
logger.finest("OP_WRITE is already present in interest Ops for "+getName());
}
} else if(getSelectionKey().isValid()) {
if((getSelectionKey().interestOps() & SelectionKey.OP_WRITE) == 0 ) {
logger.finest("Adding OP_WRITE to interest Ops for "+getName());
removeEvent(ClientEvent.WRITE);
getSelectionKey().interestOps(getSelectionKey().interestOps()
| SelectionKey.OP_WRITE);
if(wakeupSelectorAfterRegisterWrite) {
getServer().getSelector().wakeup();
}
} else {
if(ByteBufferOutputStream.isLoggable(Level.FINEST)) {
logger.finest("OP_WRITE is already present in interest Ops for "+getName());
}
}
} else {
throw new IOException("SelectionKey is invalid!");
}
} catch(CancelledKeyException e) {
throw new IOException("SelectionKey is cancelled!");
}
}
protected void setClientWriteHandler(ClientWriteHandler handler) {
clientWriteHandler=handler;
}
/**
* Returns number of thread currently in this object.
* @since 1.4.6
*/
public int getThreadAccessCount() {
return threadAccessCount;
}
private void doHandshake() throws Exception {
while (true) {
SSLEngineResult res;
logger.fine("handshakeStatus: "+handshakeStatus);
if(handshakeStatus==SSLEngineResult.HandshakeStatus.FINISHED) {
if(initialHandshakeStatus) {
finishInitialHandshake();
}
return;
} else if(handshakeStatus==SSLEngineResult.HandshakeStatus.NEED_TASK) {
doTasks();
continue;
} else if(handshakeStatus==SSLEngineResult.HandshakeStatus.NEED_UNWRAP) {
/*
doRead();
if(initialHandshakeStatus &&
status == SSLEngineResult.Status.BUFFER_UNDERFLOW) {
registerForRead();
}
*/
return;
} else if(handshakeStatus==SSLEngineResult.HandshakeStatus.NEED_WRAP) {
ByteBuffer netData = (ByteBuffer) getServer().getByteBufferPool().borrowObject();
//netData.clear();
res = sslEngine.wrap(dummyByteBuffer, netData);
logger.info("Wrapping:\n" + res);
assert res.bytesProduced() != 0 : "No net data produced during handshake wrap.";
assert res.bytesConsumed() == 0 : "App data consumed during handshake wrap.";
handshakeStatus = res.getHandshakeStatus();
//netData.flip(); -- no need to flip will be done when writing to sc
byteBufferOutputStream.addEncryptedByteBuffer(netData);
if (!doWrite()) {
return;
}
continue;//back to loop
} else if(handshakeStatus==SSLEngineResult.HandshakeStatus.NOT_HANDSHAKING) {
assert false : "doHandshake() should never reach the NOT_HANDSHAKING state";
return;
}//if
}//loop
}
private void doTasks() {
Runnable task;
while ((task = sslEngine.getDelegatedTask()) != null) {
logger.fine("Running the task.. START ");
task.run();
logger.fine("Running the task.. END");
}
handshakeStatus = sslEngine.getHandshakeStatus();
logger.fine("handshakeStatus: "+handshakeStatus);
}
private void finishInitialHandshake() throws IOException {
initialHandshakeStatus = false;
}
public boolean getInitialHandshakeStatus() {
return initialHandshakeStatus;
}
public ByteBuffer encrypt(ByteBuffer src) throws IOException {
if(initialHandshakeStatus) {
logger.fine("Writing not possible during handshake!");
//Exception e = new Exception();
//e.printStackTrace();
return null;
}
ByteBuffer dest = null;
boolean isException = false;
try {
src.flip();
dest = (ByteBuffer) getServer().getByteBufferPool().borrowObject();
//dest.clear();
SSLEngineResult res = sslEngine.wrap(src, dest);
logger.info("Wrapping:\n" + res);
//dest.flip();
return dest;
} catch(IOException e) {
logger.warning("IOException:" + e);
isException = true;
throw e;
} catch(Exception e) {
logger.warning("Exception:" + e);
isException = true;
throw new IOException(e.getMessage());
} finally {
if(isException==true && dest!=null) {
try {
getServer().getByteBufferPool().returnObject(dest);
} catch(Exception er) {
logger.warning("Error in returning ByteBuffer to pool: "+er);
}
}
}
}
}