blob: 6f90c1ddcb2f77a524ff3c813763736004becc7b [file] [log] [blame]
// SPDX-License-Identifier: LGPL-2.1-or-later
// Copyright (c) 2012-2014 Monty Program Ab
// Copyright (c) 2015-2021 MariaDB Corporation Ab
package org.mariadb.jdbc.client.impl;
import java.sql.*;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.locks.ReentrantLock;
import org.mariadb.jdbc.Configuration;
import org.mariadb.jdbc.HostAddress;
import org.mariadb.jdbc.Statement;
import org.mariadb.jdbc.client.Client;
import org.mariadb.jdbc.client.Completion;
import org.mariadb.jdbc.client.Context;
import org.mariadb.jdbc.client.context.RedoContext;
import org.mariadb.jdbc.export.ExceptionFactory;
import org.mariadb.jdbc.export.Prepare;
import org.mariadb.jdbc.message.ClientMessage;
import org.mariadb.jdbc.message.client.ChangeDbPacket;
import org.mariadb.jdbc.message.client.QueryPacket;
import org.mariadb.jdbc.message.client.RedoableWithPrepareClientMessage;
import org.mariadb.jdbc.util.constants.ConnectionState;
import org.mariadb.jdbc.util.constants.ServerStatus;
import org.mariadb.jdbc.util.log.Logger;
import org.mariadb.jdbc.util.log.Loggers;
/**
* Handling connection failing automatic reconnection transparently when possible for multi-master
* Topology.
*
* <p>remark: would have been better using proxy, but for AOT compilation, avoiding to using not
* supported proxy class.
*/
public class MultiPrimaryClient implements Client {
private static final Logger logger = Loggers.getLogger(MultiPrimaryClient.class);
/** temporary blacklisted hosts */
protected static final ConcurrentMap<HostAddress, Long> denyList = new ConcurrentHashMap<>();
/** denied timeout */
protected final long deniedListTimeout;
/** configuration */
protected final Configuration conf;
/** is connections explicitly closed */
protected boolean closed = false;
/** thread locker */
protected final ReentrantLock lock;
/** current client */
protected Client currentClient;
/**
* Constructor
*
* @param conf configuration
* @param lock thread locker
* @throws SQLException if fail to connect
*/
public MultiPrimaryClient(Configuration conf, ReentrantLock lock) throws SQLException {
this.conf = conf;
this.lock = lock;
deniedListTimeout =
Long.parseLong(conf.nonMappedOptions().getProperty("deniedListTimeout", "60000"));
currentClient = connectHost(false, false);
}
/**
* Trying connecting server.
*
* <p>searching each connecting primary / replica connection not temporary denied until found one.
* searching in temporary denied host if not succeed, until reaching `retriesAllDown` attempts.
*
* @param readOnly must connect a replica / primary
* @param failFast must try only not denied server
* @return a valid connection client
* @throws SQLException if not succeed to create a connection.
*/
protected Client connectHost(boolean readOnly, boolean failFast) throws SQLException {
Optional<HostAddress> host;
SQLNonTransientConnectionException lastSqle = null;
int maxRetries = conf.retriesAllDown();
while ((host = conf.haMode().getAvailableHost(conf.addresses(), denyList, !readOnly))
.isPresent()
&& maxRetries > 0) {
try {
return conf.transactionReplay()
? new ReplayClient(conf, host.get(), lock, false)
: new StandardClient(conf, host.get(), lock, false);
} catch (SQLNonTransientConnectionException sqle) {
lastSqle = sqle;
denyList.putIfAbsent(host.get(), System.currentTimeMillis() + deniedListTimeout);
maxRetries--;
}
}
if (failFast) {
throw (lastSqle != null)
? lastSqle
: new SQLNonTransientConnectionException("all hosts are blacklisted");
}
// All server corresponding to type are in deny list
// return the one with lower denylist timeout
// (check that server is in conf, because denyList is shared for all instances)
if (denyList.entrySet().stream()
.noneMatch(e -> conf.addresses().contains(e.getKey()) && e.getKey().primary != readOnly))
throw new SQLNonTransientConnectionException(
String.format("No %s host defined", readOnly ? "replica" : "primary"));
while (maxRetries > 0) {
try {
host =
denyList.entrySet().stream()
.sorted(Map.Entry.comparingByValue())
.filter(
e -> conf.addresses().contains(e.getKey()) && e.getKey().primary != readOnly)
.findFirst()
.map(Map.Entry::getKey);
if (host.isPresent()) {
Client client =
conf.transactionReplay()
? new ReplayClient(conf, host.get(), lock, false)
: new StandardClient(conf, host.get(), lock, false);
denyList.remove(host.get());
return client;
}
maxRetries--;
} catch (SQLNonTransientConnectionException sqle) {
lastSqle = sqle;
host.ifPresent(
hostAddress ->
denyList.putIfAbsent(hostAddress, System.currentTimeMillis() + deniedListTimeout));
maxRetries--;
if (maxRetries > 0) {
try {
// wait 250ms before looping through
Thread.sleep(250);
} catch (InterruptedException interrupted) {
// interrupted, continue
}
}
}
}
throw lastSqle;
}
/**
* Connection loop
*
* @return client connection
* @throws SQLException if fail to connect
*/
protected Client reConnect() throws SQLException {
denyList.putIfAbsent(
currentClient.getHostAddress(), System.currentTimeMillis() + deniedListTimeout);
logger.info("Connection error on {}", currentClient.getHostAddress());
try {
Client oldClient = currentClient;
// remove cached prepare from existing server prepare statement
oldClient.getContext().resetPrepareCache();
currentClient = connectHost(false, false);
syncNewState(oldClient);
return oldClient;
} catch (SQLNonTransientConnectionException sqle) {
currentClient = null;
closed = true;
throw sqle;
}
}
/**
* Execute transaction replay if in transaction and configured for it, throw an exception if not
*
* @param oldClient previous client
* @param canRedo if command can be redo even if not in transaction
* @throws SQLException if not able to replay
*/
protected void replayIfPossible(Client oldClient, boolean canRedo) throws SQLException {
// oldClient is only valued if this occurs on master.
if (oldClient != null) {
if ((oldClient.getContext().getServerStatus() & ServerStatus.IN_TRANSACTION) > 0) {
if (conf.transactionReplay()) {
executeTransactionReplay(oldClient);
} else {
// transaction is lost, but connection is now up again.
// changing exception to SQLTransientConnectionException
throw new SQLTransientConnectionException(
String.format(
"Driver has reconnect connection after a communications link failure with %s. In"
+ " progress transaction was lost",
oldClient.getHostAddress()),
"25S03");
}
} else if (!canRedo) {
// no transaction, but connection is now up again.
// changing exception to SQLTransientConnectionException
throw new SQLTransientConnectionException(
String.format(
"Driver has reconnect connection after a communications link failure with %s",
oldClient.getHostAddress()),
"25S03");
}
}
}
/**
* Execute transaction replay
*
* @param oldCli previous client
* @throws SQLException if not able to replay
*/
protected void executeTransactionReplay(Client oldCli) throws SQLException {
// transaction replay
RedoContext ctx = (RedoContext) oldCli.getContext();
if (ctx.getTransactionSaver().isDirty()) {
ctx.getTransactionSaver().clear();
throw new SQLTransientConnectionException(
String.format(
"Driver has reconnect connection after a communications link failure with %s. In"
+ " progress transaction was too big to be replayed, and was lost",
oldCli.getHostAddress()),
"25S03");
}
((ReplayClient) currentClient).transactionReplay(ctx.getTransactionSaver());
}
/**
* Synchronized previous and new client states.
*
* @param oldCli previous client
* @throws SQLException if error occurs
*/
public void syncNewState(Client oldCli) throws SQLException {
Context oldCtx = oldCli.getContext();
currentClient.getExceptionFactory().setConnection(oldCli.getExceptionFactory());
if ((oldCtx.getStateFlag() & ConnectionState.STATE_AUTOCOMMIT) > 0) {
if ((oldCtx.getServerStatus() & ServerStatus.AUTOCOMMIT)
!= (currentClient.getContext().getServerStatus() & ServerStatus.AUTOCOMMIT)) {
currentClient.getContext().addStateFlag(ConnectionState.STATE_AUTOCOMMIT);
currentClient.execute(
new QueryPacket(
"set autocommit="
+ (((oldCtx.getServerStatus() & ServerStatus.AUTOCOMMIT) > 0) ? "1" : "0")),
true);
}
}
if ((oldCtx.getStateFlag() & ConnectionState.STATE_DATABASE) > 0
&& !Objects.equals(currentClient.getContext().getDatabase(), oldCtx.getDatabase())) {
currentClient.getContext().addStateFlag(ConnectionState.STATE_DATABASE);
if (oldCtx.getDatabase() != null) {
currentClient.execute(new ChangeDbPacket(oldCtx.getDatabase()), true);
}
currentClient.getContext().setDatabase(oldCtx.getDatabase());
}
if ((oldCtx.getStateFlag() & ConnectionState.STATE_NETWORK_TIMEOUT) > 0) {
currentClient.setSocketTimeout(oldCli.getSocketTimeout());
}
if ((oldCtx.getStateFlag() & ConnectionState.STATE_READ_ONLY) > 0
&& !currentClient.getHostAddress().primary
&& currentClient.getContext().getVersion().versionGreaterOrEqual(5, 6, 5)) {
currentClient.execute(new QueryPacket("SET SESSION TRANSACTION READ ONLY"), true);
}
if ((oldCtx.getStateFlag() & ConnectionState.STATE_TRANSACTION_ISOLATION) > 0
&& currentClient.getContext().getTransactionIsolationLevel()
!= oldCtx.getTransactionIsolationLevel()) {
String query = "SET SESSION TRANSACTION ISOLATION LEVEL";
switch (oldCtx.getTransactionIsolationLevel()) {
case java.sql.Connection.TRANSACTION_READ_UNCOMMITTED:
query += " READ UNCOMMITTED";
break;
case java.sql.Connection.TRANSACTION_READ_COMMITTED:
query += " READ COMMITTED";
break;
case java.sql.Connection.TRANSACTION_REPEATABLE_READ:
query += " REPEATABLE READ";
break;
case java.sql.Connection.TRANSACTION_SERIALIZABLE:
query += " SERIALIZABLE";
break;
}
currentClient
.getContext()
.setTransactionIsolationLevel(oldCtx.getTransactionIsolationLevel());
currentClient.execute(new QueryPacket(query), true);
}
}
@Override
public List<Completion> execute(ClientMessage message, boolean canRedo) throws SQLException {
return execute(
message,
null,
0,
0L,
ResultSet.CONCUR_READ_ONLY,
ResultSet.TYPE_FORWARD_ONLY,
false,
canRedo);
}
@Override
public List<Completion> execute(
ClientMessage message, org.mariadb.jdbc.Statement stmt, boolean canRedo) throws SQLException {
return execute(
message,
stmt,
0,
0L,
ResultSet.CONCUR_READ_ONLY,
ResultSet.TYPE_FORWARD_ONLY,
false,
canRedo);
}
@Override
public List<Completion> execute(
ClientMessage message,
Statement stmt,
int fetchSize,
long maxRows,
int resultSetConcurrency,
int resultSetType,
boolean closeOnCompletion,
boolean canRedo)
throws SQLException {
if (closed) {
throw new SQLNonTransientConnectionException("Connection is closed", "08000", 1220);
}
try {
return currentClient.execute(
message,
stmt,
fetchSize,
maxRows,
resultSetConcurrency,
resultSetType,
closeOnCompletion,
canRedo);
} catch (SQLNonTransientConnectionException e) {
HostAddress hostAddress = currentClient.getHostAddress();
Client oldClient = reConnect();
if (message instanceof QueryPacket && ((QueryPacket) message).isCommit()) {
throw new SQLTransientConnectionException(
String.format(
"Driver has reconnect connection after a communications failure with %s during a"
+ " COMMIT statement",
hostAddress),
"25S03");
}
replayIfPossible(oldClient, canRedo);
if (message instanceof RedoableWithPrepareClientMessage) {
((RedoableWithPrepareClientMessage) message).rePrepare(currentClient);
}
return currentClient.execute(
message,
stmt,
fetchSize,
maxRows,
resultSetConcurrency,
resultSetType,
closeOnCompletion,
canRedo);
}
}
@Override
public List<Completion> executePipeline(
ClientMessage[] messages,
Statement stmt,
int fetchSize,
long maxRows,
int resultSetConcurrency,
int resultSetType,
boolean closeOnCompletion,
boolean canRedo)
throws SQLException {
if (closed) {
throw new SQLNonTransientConnectionException("Connection is closed", "08000", 1220);
}
try {
return currentClient.executePipeline(
messages,
stmt,
fetchSize,
maxRows,
resultSetConcurrency,
resultSetType,
closeOnCompletion,
canRedo);
} catch (SQLException e) {
if (e instanceof SQLNonTransientConnectionException
|| (e.getCause() != null && e.getCause() instanceof SQLNonTransientConnectionException)) {
Client oldClient = reConnect();
replayIfPossible(oldClient, canRedo);
Arrays.stream(messages)
.filter(RedoableWithPrepareClientMessage.class::isInstance)
.map(RedoableWithPrepareClientMessage.class::cast)
.forEach(
rd -> {
try {
rd.rePrepare(currentClient);
} catch (SQLException sqle) {
// eat
}
});
return currentClient.executePipeline(
messages,
stmt,
fetchSize,
maxRows,
resultSetConcurrency,
resultSetType,
closeOnCompletion,
canRedo);
}
throw e;
}
}
@Override
public void readStreamingResults(
List<Completion> completions,
int fetchSize,
long maxRows,
int resultSetConcurrency,
int resultSetType,
boolean closeOnCompletion)
throws SQLException {
if (closed) {
throw new SQLNonTransientConnectionException("Connection is closed", "08000", 1220);
}
try {
currentClient.readStreamingResults(
completions, fetchSize, maxRows, resultSetConcurrency, resultSetType, closeOnCompletion);
} catch (SQLNonTransientConnectionException e) {
try {
reConnect();
} catch (SQLException e2) {
throw getExceptionFactory()
.create("Socket error during result streaming", e2.getSQLState(), e2);
}
throw getExceptionFactory().create("Socket error during result streaming", "HY000", e);
}
}
@Override
public void closePrepare(Prepare prepare) throws SQLException {
if (closed) {
throw new SQLNonTransientConnectionException("Connection is closed", "08000", 1220);
}
try {
currentClient.closePrepare(prepare);
} catch (SQLNonTransientConnectionException e) {
reConnect();
}
}
@Override
public void abort(Executor executor) throws SQLException {
if (closed) {
throw new SQLNonTransientConnectionException("Connection is closed", "08000", 1220);
}
currentClient.abort(executor);
}
@Override
public void close() throws SQLException {
closed = true;
currentClient.close();
}
@Override
public void setReadOnly(boolean readOnly) throws SQLException {
if (closed) {
throw new SQLNonTransientConnectionException("Connection is closed", "08000", 1220);
}
}
@Override
public int getSocketTimeout() {
return currentClient.getSocketTimeout();
}
@Override
public void setSocketTimeout(int milliseconds) throws SQLException {
if (closed) {
throw new SQLNonTransientConnectionException("Connection is closed", "08000", 1220);
}
try {
currentClient.setSocketTimeout(milliseconds);
} catch (SQLNonTransientConnectionException e) {
reConnect();
currentClient.setSocketTimeout(milliseconds);
}
}
@Override
public boolean isClosed() {
return closed;
}
@Override
public Context getContext() {
return currentClient.getContext();
}
@Override
public ExceptionFactory getExceptionFactory() {
return currentClient.getExceptionFactory();
}
@Override
public HostAddress getHostAddress() {
return currentClient.getHostAddress();
}
public boolean isPrimary() {
return true;
}
@Override
public void reset() {
currentClient.getContext().resetStateFlag();
currentClient.getContext().resetPrepareCache();
}
}