blob: 824491160a8cec5a19776aea72d9c2d41bc6481b [file] [log] [blame]
// SPDX-License-Identifier: LGPL-2.1-or-later
// Copyright (c) 2012-2014 Monty Program Ab
// Copyright (c) 2015-2021 MariaDB Corporation Ab
package org.mariadb.jdbc.client.impl;
import java.sql.SQLException;
import java.sql.SQLNonTransientConnectionException;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.locks.ReentrantLock;
import org.mariadb.jdbc.Configuration;
import org.mariadb.jdbc.HostAddress;
import org.mariadb.jdbc.Statement;
import org.mariadb.jdbc.client.Client;
import org.mariadb.jdbc.client.Completion;
import org.mariadb.jdbc.client.Context;
import org.mariadb.jdbc.export.ExceptionFactory;
import org.mariadb.jdbc.export.Prepare;
import org.mariadb.jdbc.message.ClientMessage;
import org.mariadb.jdbc.util.log.Logger;
import org.mariadb.jdbc.util.log.Loggers;
/**
* Handling connection failing automatic reconnection transparently when possible for replication
* Topology.
*
* <p>remark: would have been better using proxy, but for AOT compilation, avoiding to using not
* supported proxy class.
*/
public class MultiPrimaryReplicaClient extends MultiPrimaryClient {
private static final Logger logger = Loggers.getLogger(MultiPrimaryReplicaClient.class);
/** timeout before retrying to reconnect failing host */
protected long waitTimeout;
private Client replicaClient;
private Client primaryClient;
private boolean requestReadOnly;
private long nextTryReplica = -1;
private long nextTryPrimary = -1;
/**
* Constructor
*
* @param conf configuration
* @param lock thread locker
* @throws SQLException if any error occurs
*/
public MultiPrimaryReplicaClient(Configuration conf, ReentrantLock lock) throws SQLException {
super(conf, lock);
primaryClient = currentClient;
waitTimeout =
Long.parseLong(conf.nonMappedOptions().getProperty("waitReconnectTimeout", "30000"));
try {
replicaClient = connectHost(true, false);
} catch (SQLException e) {
replicaClient = null;
nextTryReplica = System.currentTimeMillis() + waitTimeout;
}
}
private void reconnectIfNeeded() {
if (!closed) {
// try to reconnect primary
if (primaryClient == null && nextTryPrimary < System.currentTimeMillis()) {
try {
primaryClient = connectHost(false, true);
nextTryPrimary = -1;
} catch (SQLException e) {
nextTryPrimary = System.currentTimeMillis() + waitTimeout;
}
}
// try to reconnect replica
if (replicaClient == null && nextTryReplica < System.currentTimeMillis()) {
try {
replicaClient = connectHost(true, true);
nextTryReplica = -1;
if (requestReadOnly) {
syncNewState(primaryClient);
currentClient = replicaClient;
}
} catch (SQLException e) {
nextTryReplica = System.currentTimeMillis() + waitTimeout;
}
}
}
}
/**
* Reconnect connection, trying to continue transparently if possible. Different possible cases :
* replica fails, then reconnect to replica or to master if no replica available
*
* <p>if reconnect succeed on replica / use master, no problem, continuing without interruption //
* if reconnect primary, then replay transaction / throw exception if was in transaction.
*
* @throws SQLException if exception
*/
@Override
protected Client reConnect() throws SQLException {
denyList.putIfAbsent(
currentClient.getHostAddress(), System.currentTimeMillis() + deniedListTimeout);
logger.info("Connection error on {}", currentClient.getHostAddress());
try {
Client oldClient = currentClient;
if (oldClient.isPrimary()) {
primaryClient = null;
} else {
replicaClient = null;
}
// remove cached prepare from existing server prepare statement
oldClient.getContext().resetPrepareCache();
try {
currentClient = connectHost(requestReadOnly, requestReadOnly);
if (requestReadOnly) {
nextTryReplica = -1;
replicaClient = currentClient;
} else {
nextTryPrimary = -1;
primaryClient = currentClient;
}
} catch (SQLNonTransientConnectionException e) {
if (requestReadOnly) {
nextTryReplica = System.currentTimeMillis() + waitTimeout;
if (primaryClient != null) {
// connector will use primary client until some replica is up
currentClient = primaryClient;
} else {
// replication fails, and no primary connection
// trying to create new primary connection
try {
primaryClient = connectHost(false, false);
currentClient = primaryClient;
nextTryPrimary = -1;
} catch (SQLNonTransientConnectionException ee) {
closed = true;
throw new SQLNonTransientConnectionException(
String.format(
"Driver has failed to reconnect connection after a "
+ "communications "
+ "failure with %s",
oldClient.getHostAddress()),
"08000");
}
}
} else {
throw new SQLNonTransientConnectionException(
String.format(
"Driver has failed to reconnect master connection after a "
+ "communications "
+ "failure with %s",
oldClient.getHostAddress()),
"08000");
}
}
syncNewState(oldClient);
// if reconnect succeed on replica / use master, no problem, continuing without interruption
// if reconnect primary, then replay transaction / throw exception if was in transaction.
return requestReadOnly ? null : oldClient;
} catch (SQLNonTransientConnectionException sqle) {
currentClient = null;
closed = true;
if (replicaClient != null) {
replicaClient.close();
}
throw sqle;
}
}
@Override
public List<Completion> execute(
ClientMessage message,
Statement stmt,
int fetchSize,
long maxRows,
int resultSetConcurrency,
int resultSetType,
boolean closeOnCompletion,
boolean canRedo)
throws SQLException {
reconnectIfNeeded();
return super.execute(
message,
stmt,
fetchSize,
maxRows,
resultSetConcurrency,
resultSetType,
closeOnCompletion,
canRedo);
}
@Override
public List<Completion> executePipeline(
ClientMessage[] messages,
Statement stmt,
int fetchSize,
long maxRows,
int resultSetConcurrency,
int resultSetType,
boolean closeOnCompletion,
boolean canRedo)
throws SQLException {
reconnectIfNeeded();
return super.executePipeline(
messages,
stmt,
fetchSize,
maxRows,
resultSetConcurrency,
resultSetType,
closeOnCompletion,
canRedo);
}
@Override
public void readStreamingResults(
List<Completion> completions,
int fetchSize,
long maxRows,
int resultSetConcurrency,
int resultSetType,
boolean closeOnCompletion)
throws SQLException {
reconnectIfNeeded();
super.readStreamingResults(
completions, fetchSize, maxRows, resultSetConcurrency, resultSetType, closeOnCompletion);
}
@Override
public void closePrepare(Prepare prepare) throws SQLException {
reconnectIfNeeded();
super.closePrepare(prepare);
}
@Override
public void abort(Executor executor) throws SQLException {
reconnectIfNeeded();
super.abort(executor);
}
@Override
public void close() throws SQLException {
if (!closed) {
closed = true;
try {
if (primaryClient != null) primaryClient.close();
} catch (SQLException e) {
// eat
}
try {
if (replicaClient != null) replicaClient.close();
} catch (SQLException e) {
// eat
}
primaryClient = null;
replicaClient = null;
}
}
@Override
public void setReadOnly(boolean readOnly) throws SQLException {
if (closed) {
throw new SQLNonTransientConnectionException("Connection is closed", "08000", 1220);
}
if (readOnly) {
// changed ?
if (!requestReadOnly) {
if (replicaClient != null) {
currentClient = replicaClient;
syncNewState(primaryClient);
} else if (nextTryReplica < System.currentTimeMillis()) {
try {
replicaClient = connectHost(true, true);
currentClient = replicaClient;
syncNewState(primaryClient);
} catch (SQLException e) {
nextTryReplica = System.currentTimeMillis() + waitTimeout;
}
}
}
} else {
// changed ?
if (requestReadOnly) {
if (primaryClient != null) {
currentClient = primaryClient;
syncNewState(replicaClient);
} else if (nextTryPrimary < System.currentTimeMillis()) {
try {
primaryClient = connectHost(false, false);
nextTryPrimary = -1;
syncNewState(replicaClient);
} catch (SQLException e) {
nextTryPrimary = System.currentTimeMillis() + waitTimeout;
throw new SQLNonTransientConnectionException(
"Driver has failed to reconnect a primary connection", "08000");
}
}
}
}
requestReadOnly = readOnly;
}
@Override
public int getSocketTimeout() {
reconnectIfNeeded();
return super.getSocketTimeout();
}
@Override
public void setSocketTimeout(int milliseconds) throws SQLException {
reconnectIfNeeded();
super.setSocketTimeout(milliseconds);
}
@Override
public Context getContext() {
reconnectIfNeeded();
return super.getContext();
}
@Override
public ExceptionFactory getExceptionFactory() {
reconnectIfNeeded();
return super.getExceptionFactory();
}
@Override
public HostAddress getHostAddress() {
reconnectIfNeeded();
return super.getHostAddress();
}
public boolean isPrimary() {
return getHostAddress().primary;
}
@Override
public void reset() {
if (replicaClient != null) {
replicaClient.getContext().resetStateFlag();
replicaClient.getContext().resetPrepareCache();
}
if (primaryClient != null) {
primaryClient.getContext().resetStateFlag();
primaryClient.getContext().resetPrepareCache();
}
}
}